package cn.wolfcode.service.impl;

import cn.wolfcode.common.exception.BusinessException;
import cn.wolfcode.common.web.CodeMsg;
import cn.wolfcode.common.web.Result;
import cn.wolfcode.domain.Product;
import cn.wolfcode.domain.SeckillProduct;
import cn.wolfcode.domain.SeckillProductVo;
import cn.wolfcode.feign.ProductFeignApi;
import cn.wolfcode.mapper.SeckillProductMapper;
import cn.wolfcode.redis.SeckillRedisKey;
import cn.wolfcode.service.ISeckillProductService;
import cn.wolfcode.util.AssertUtils;
import cn.wolfcode.util.IdGenerateUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
@Service
@CacheConfig(cacheNames = "SeckillProduct")
public class SeckillProductServiceImpl implements ISeckillProductService {
    private final SeckillProductMapper seckillProductMapper;
    private final StringRedisTemplate redisTemplate;
    private final ProductFeignApi productFeignApi;
    private final RedisScript<Boolean> redisScript;
    @Autowired
    private ScheduledExecutorService scheduledExecutorService;

    public SeckillProductServiceImpl(SeckillProductMapper seckillProductMapper, StringRedisTemplate redisTemplate, ProductFeignApi productFeignApi, RedisScript<Boolean> redisScript) {
        this.seckillProductMapper = seckillProductMapper;
        this.redisTemplate = redisTemplate;
        this.productFeignApi = productFeignApi;
        this.redisScript = redisScript;
    }
//    @Autowired
//    private RocketMQTemplate rocketMQTemplate;

    @Override
    public List<SeckillProductVo> selectTodayListByTime(Integer time) {
        // 1. 调用秒杀服务接口, 基于今天的时间, 查询今天的所有秒杀商品数据
        List<SeckillProduct> todayList = seckillProductMapper.queryCurrentlySeckillProduct(time);
        // 2. 遍历秒杀商品列表, 得到商品 id 列表
        List<Long> productIdList = todayList.stream() // Stream<SeckillProduct>
                .map(SeckillProduct::getProductId) // SeckillProduct => Long
                .distinct()
                .collect(Collectors.toList());
        // 3. 根据商品 id 列表, 调用商品服务查询接口, 得到商品列表
        Result<List<Product>> result = productFeignApi.selectByIdList(productIdList);
        /**
         * result 可能存在的几种情况:
         *  1. 远程接口正常返回, code == 200, data == 想要的数据
         *  2. 远程接口出现异常, code != 200
         *  3. 接口被熔断降级, data == null
         */
        if (result.hasError() || result.getData() == null) {
            throw new BusinessException(new CodeMsg(result.getCode(), result.getMsg()));
        }

        List<Product> products = result.getData();

        // 4. 遍历秒杀商品列表, 将商品对象与秒杀商品对象聚合到一起
        // List<SeckillProduct> => List<SeckillProductVo>
        List<SeckillProductVo> productVoList = todayList.stream()
                .map(sp -> {
                    SeckillProductVo vo = new SeckillProductVo();
                    BeanUtils.copyProperties(sp, vo);

                    List<Product> list = products.stream().filter(p -> sp.getProductId().equals(p.getId())).collect(Collectors.toList());
                    if (list.size() > 0) {
                        Product product = list.get(0);
                        BeanUtils.copyProperties(product, vo);
                    }
                    vo.setId(sp.getId());

                    return vo;
                }) // Stream<SeckillProductVo>
                .collect(Collectors.toList());

        return productVoList;
    }

    @Override
    public List<SeckillProductVo> selectTodayListByTimeFromRedis(Integer time) {
        String key = SeckillRedisKey.SECKILL_PRODUCT_LIST.join(time + "");
        List<String> stringList = redisTemplate.opsForList().range(key, 0, -1);

        if (stringList == null || stringList.size() == 0) {
            log.warn("[秒杀商品] 查询秒杀商品列表异常, Redis 中没有数据, 从 DB 中查询...");
            return this.selectTodayListByTime(time);
        }

        return stringList.stream().map(json -> JSON.parseObject(json, SeckillProductVo.class)).collect(Collectors.toList());
    }

    @Override
    @Cacheable(key = "'selectByIdAndTime:' + #seckillId")
    public SeckillProductVo selectByIdAndTime(Long seckillId, Integer time) {
        SeckillProduct seckillProduct = seckillProductMapper.selectByIdAndTime(seckillId, time);

        Result<List<Product>> result = productFeignApi.selectByIdList(Collections.singletonList(seckillProduct.getProductId()));
        if (result.hasError() || result.getData() == null || result.getData().size() == 0) {
            throw new BusinessException(new CodeMsg(result.getCode(), result.getMsg()));
        }

        Product product = result.getData().get(0);

        SeckillProductVo vo = new SeckillProductVo();
        // 先将商品的属性 copy 到 vo 对象中
        BeanUtils.copyProperties(product, vo);

        // 再将秒杀商品的属性 copy 到 vo 对象中, 并覆盖 id 属性
        BeanUtils.copyProperties(seckillProduct, vo);
        return vo;
    }

    @CacheEvict(key = "'selectByIdAndTime:' + #id")
    @Override
    public void decrStockCount(Long id) {
        int row = seckillProductMapper.decrStock(id);
        AssertUtils.isTrue(row > 0, "库存不足");
    }

    @Override
    public Long selectStockCountById(Long seckillId) {
        return seckillProductMapper.selectStockCountById(seckillId);
    }

    @Override
    public void incrStockCount(Long seckillId) {
        seckillProductMapper.incrStock(seckillId);
    }

    /**
     * 分布式锁总结:
     * 1. 利用 Redis SETNX 原子性命令实现分布式锁
     * 2. 未避免死锁, 加锁时引入超时机制, 如果出现意外宕机, 需要可以自动释放锁
     * 3. 未避免加锁操作与设置超时时间操作原子性问题, 使用 LUA 脚本保证多个命令执行的原子性
     * 4. 锁只能由加锁的线程释放, 不能被其他线程释放, 利用唯一线程 id 作为 value, 删除时判断 id 是否一致
     * 5. 未避免业务没执行完, 锁就过期了, 引入 watchdog 实现锁自动续期
     */
    @CacheEvict(key = "'selectByIdAndTime:' + #id")
    @Override
    public void decrStockCount(Long id, Integer time) {
        // 1. 锁哪个对象 => 锁指定场次下的指定商品
        // 2. 当多线程同时加锁时, 只能有一个线程加锁成功 => Redis 的 setnx 命令, 例如使用 setnx 秒杀场次+秒杀商品id xxxx
        // 3. 锁记录存在什么地方 => 利用 Redis 的 setnx 命令存储在 Redis 的 String 数据结构中
        // 4. 当线程获取不到锁的时候, 执行什么策略 => 阻塞/自旋等待(限制次数)/直接抛异常
        String key = "seckill:product:stockcount:" + time + ":" + id;
        String threadId = "";
        ScheduledFuture<?> future = null;
        try {
            // 如果自旋次数超过 5 次就抛出异常
            int count = 0;
            Boolean ret = false;
            int timeout = 5;
            do {
                // 生成分布式唯一线程 id
                threadId = IdGenerateUtil.get().nextId() + "";
                // 利用 LUA 脚本, 一次性发送 setnx 和设置超时时间的命令给 redis
                // ret = redisTemplate.opsForValue().setIfAbsent(key, "1");
                ret = redisTemplate.execute(redisScript, Collections.singletonList(key), threadId, timeout + "");
                if (ret != null && ret) {
                    break;
                }

                AssertUtils.isTrue((count++) < 5, "系统繁忙, 请稍后再试");
                // 避免 CPU 过于频繁
                Thread.sleep(20);
            } while (true);

            // 加锁成功, 创建 WatchDog 监听业务是否执行完成, 实现续期操作
            long delayTime = (long) (timeout * 0.8);
            String finalThreadId = threadId;
            System.out.println("[WatchDog 间隔时间]" + delayTime);
            future = scheduledExecutorService.scheduleAtFixedRate(
                    () -> {
                        // 1. 查询 Redis 中 key 是否存在, 如果存在, 就续期
                        String value = redisTemplate.opsForValue().get(key);
                        System.out.println("[WatchDog] ----------------------------- 执行 Redis 续期操作 threadId=" + finalThreadId + ", key=" + key + ", value=" + value);
                        if (finalThreadId.equals(value)) {
                            // 1.1 将当前 key 再次续期
                            redisTemplate.expire(key, delayTime + 2, TimeUnit.SECONDS);
                        }
                    },
                    delayTime,
                    delayTime,
                    TimeUnit.SECONDS
            );

            // 先查库存, 再扣库存
            Long stockCount = seckillProductMapper.selectStockCountById(id);
            AssertUtils.isTrue(stockCount > 0, "库存不足!");

            seckillProductMapper.decrStock(id);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (future != null) {
                future.cancel(true);
            }
            // 释放锁
            // 先获取到 value, 判断当前的 value 是否与 threadId 相同
            // 只有 id 相同才需要释放锁
            String value = redisTemplate.opsForValue().get(key);
            if (threadId.equals(value)) {
                redisTemplate.delete(key);
            }
        }
    }
}
